package com.rtsapp.server.domain.mysql.sql.impl;

import com.rtsapp.server.domain.mysql.sql.*;
import com.rtsapp.server.domain.mysql.sql.impl.op.PingTask;
import com.rtsapp.server.domain.mysql.sql.impl.op.PreparedStatementTask;
import com.rtsapp.server.domain.mysql.sql.impl.op.TransactionTask;
import com.rtsapp.server.logger.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;


public class DatabaseWorkerPoolImpl implements DatabaseWorkerPool {

    private static final Logger LOG =com.rtsapp.server.logger.LoggerFactory.getLogger(DatabaseWorkerPoolImpl.class);

    /**
     * 异步, 同步 数量常量标志
     */
    private static final int IDX_ASYNC = 0;
    private static final int IDX_SYNCH = 1;
    private static final int IDX_SIZE = 2;

    /**
     * 数据库写队列
     */
    private final  BlockingQueue<SQLOperation> sqlQueues[];

    /**
     * 异步查询的任务线程
     * 1. 异步查询的线程是单独的, 不和数据库写队列共用, 主要是因为
     *      1. 异步查询都需要返回结果, 因此需要跟踪Future, 并得到结果, 而大部分的写操作是没有这个需求的, 减少Future对象, 能节约内存
     *      2. 异步查询的时效性有要求, 但是写入队列可以慢慢写, 为了不让写入任务加大查询的延迟, 使用独立的异步查询队列
     *      3. 查询没有先后顺序，怎么查都行
     */
    private final ExecutorService asyncQueryExecutor;

    /**
     * 数据库连接
     */
    private final List<List<MySQLConnection>> connections = new ArrayList<>(IDX_SIZE);

    private final int[] connectionCount = new int[IDX_SIZE];
    /**
     * 数据库连接信息
     */
    private final  MySQLConnectionInfo connectionInfo;
    /**
     * 异步线程数
     */
    private final  int async_threads;
    /**
     * 同步线程数
     */
    private final  int synch_threads;


    public DatabaseWorkerPoolImpl(int capacity, MySQLConnectionInfo connInfo ) {

        connectionInfo = connInfo;//数据库连接信息

        async_threads = connInfo.getAsyncCount();//非阻塞
        synch_threads = connInfo.getCount();//阻塞

        sqlQueues = new BlockingQueue[async_threads];
        for( int i = 0; i < async_threads; i++ ) {
            sqlQueues[ i ] = new ArrayBlockingQueue<>(capacity);
        }

        asyncQueryExecutor = Executors.newFixedThreadPool( connectionInfo.getQueryThread(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread( "MySQLAsyncQuery" );
            }
        } );

    }

    /**
     * 开启阻塞连接和非阻塞连接
     */
    @Override
    public boolean open() {

        if (!openConnections(IDX_ASYNC, async_threads)) { //开启指定数量个非阻塞数据库连接
            return false;
        }


        if (!openConnections(IDX_SYNCH, synch_threads)) { //开启指定数量个阻塞数据库连接
            return false;
        }

        return true;

    }



    /**
     * 关闭阻塞连接和非阻塞连接
     */
    @Override
    public void close() {


        //1. 写线程关闭
        int totalTask = 0;
        while ( ( totalTask = getAsyncQueueSize() ) > 0 ) {
            LOG.info("队列中还有" + totalTask + "条数据");
            try {
                LOG.info("休眠5秒,等待数据保存完毕");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                LOG.error("close 时, 休眠被中断", e);
            }
        }
        LOG.info("队列中已无数据");



        //2. 查询线程关闭
        LOG.info("关闭异步查询线程");
        asyncQueryExecutor.shutdown();


        //3. 依次关闭数据库连接
        LOG.info("关闭所有数据库连接");
        for (byte i = 0; i < connections.get(IDX_ASYNC).size(); ++i) {
            MySQLConnection t = connections.get(IDX_ASYNC).get(i);
            t.stop();
        }

        for (byte i = 0; i < connections.get(IDX_SYNCH).size(); ++i) {
            connections.get(IDX_SYNCH).get(i).stop();
        }


    }

    /**
     * 初始化所有线程的stmt
     */
    @Override
    public boolean prepareStatement(String sqlIndex, String sql) {

        for (byte i = 0; i < IDX_SIZE; ++i)
            for (int c = 0; c < connections.get(i).size(); ++c) {
                MySQLConnection t = connections.get(i).get(c);

                // 此时可能其他线程正在使用它, 不断尝试，直到成功
                // 建议在所有模块全部启动前, 不要在其他线程进行数据库操作
                while( ! t.lockIfReady() ) {
                    LOG.error("prepareStatement 锁定连接, 进行预编译失败, 将重新尝试:" + t);
                }

                if (!t.prepareStatement(sqlIndex, sql)) {
                    t.unlock();
                    close();
                    return false;
                }
                t.unlock();
            }
        return true;
    }



    /**
     * 将stmt加入队列执行增删改
     */
    @Override
    public void execute(MySQLPreparedStatement stmt) {
        enqueue( 0, new PreparedStatementTask(stmt) );
    }

    /**
     * 将stmt加入队列执行增删改
     */
    @Override
    public void execute(MySQLPreparedStatement stmt, int queueIndex ) {
        enqueue(queueIndex, new PreparedStatementTask(stmt));
    }
    /**
     * 获取阻塞连接立即执行增删改
     */
    @Override
    public boolean directExecute(MySQLPreparedStatement stmt) {
        MySQLConnection t = getFreeConnection();
        boolean isOK = t.execute(stmt);
        t.unlock();
        return isOK;
    }

    /**
     * 获取阻塞连接立即执行查询
     */
    @Override
    public void query(MySQLPreparedStatement stmt, MySQLResultSetHandler handler) {
        MySQLConnection t = getFreeConnection();
        t.query(stmt, handler);
        t.unlock();
    }

    /**
     * 异步查询
     * @param stmt
     * @param handler
     * @return
     */
    @Override
    public Future<Object> asyncQuery( MySQLPreparedStatement stmt, MySQLResultSetFutureHandler handler  ){

        Callable<Object> queryTask = new Callable<Object>() {

            @Override
            public Object call() throws Exception {

                MySQLConnection t = getFreeConnection();
                t.query(stmt, handler);
                t.unlock();

                return handler.getResult();
            }
        };


        return asyncQueryExecutor.submit( queryTask );
    }

    /**
     * 开启事物
     */
    @Override
    public MySQLTransaction getTransaction() {
        return new MySQLTransaction();
    }

    /**
     * 提交事物
     */
    @Override
    public void executeTransaction(MySQLTransaction transaction) {
        enqueue( 0, new TransactionTask(transaction) );
    }

    @Override
    public void executeTransaction(MySQLTransaction transaction, int queueIndex ) {
        enqueue( queueIndex, new TransactionTask(transaction) );
    }


    /**
     * 直接执行stmt或者追加到事物中
     */
    @Override
    public void executeOrAppend(MySQLTransaction trans, MySQLPreparedStatement stmt) {
        if (trans == null) {
            execute(stmt);
        } else {
            trans.append(stmt);
        }
    }


    @Override
    public MySQLPreparedStatement getPreparedStatement(String sqlIndex) {
        return new MySQLPreparedStatement(sqlIndex);
    }

    /**
     * 获取连接, 这个方法除非不得已请不要调用
     *
     * @return
     */
    public MySQLConnection getConnection() {
        return getFreeConnection();
    }


    /**
     * 关闭连接, 这个方法和方面的方法一致, 记得一定要释放连接
     *
     * @param conn
     */
    public void releaseConnection(MySQLConnection conn) {
        conn.unlock();
    }


    /**
     * 开启指定数量指定类型的数据库连接
     */
    private boolean openConnections(int type, int numConnections) {

        if (connections.size() < type + 1) {
            connections.add(null);
        }

        if (connections.get(type) == null) {
            connections.set(type, (new ArrayList<>(numConnections)));
        }


        for (byte i = 0; i < numConnections; ++i) {

            MySQLConnection t = null;

            if (type == IDX_ASYNC) {
                t = new MySQLConnection(sqlQueues[i], connectionInfo, i );
            } else if (type == IDX_SYNCH) {
                t = new MySQLConnection(connectionInfo);
            } else {
                assert (false);
            }

            connections.get(type).add(t);
            ++connectionCount[type];


            if (!t.open()) {
                while (connectionCount[type] != 0) {
                    connections.get(type).remove(i--);
                    --connectionCount[type];
                }

                return false;
            }

        }

        return true;
    }

    /**
     * 将操作加入队列尾
     * @param queueIndex 选择使用哪个队列
     */
    private void enqueue(int queueIndex, SQLOperation op) {

        if( async_threads < 1  ){
            LOG.error("异步队列数量为0, 但是依然有入队操作, 请检查代码: queueIndex=" + queueIndex + ", op=" + op);
            return;
        }

        try {
            queueIndex = ( queueIndex >= 0 ) ? (queueIndex % async_threads) : (-queueIndex % async_threads);
            sqlQueues[ queueIndex ].put(op);
        } catch (InterruptedException e) {
            LOG.error("enqueue error:queueIndex=" + queueIndex, e);
        }
    }

    /**
     * 获取阻塞连接
     */
    private MySQLConnection getFreeConnection() {
        byte i = 0;
        int num_cons = connectionCount[IDX_SYNCH];
        MySQLConnection t = null;

        for (; ; ) {

            t = connections.get(IDX_SYNCH).get(++i % num_cons);

            if (t.lockIfReady()) {
                break;
            }
        }

        return t;
    }

    public void keepAlive() {

        //! Ping synchronous connections
        for (byte i = 0; i < connectionCount[IDX_SYNCH]; ++i) {
            MySQLConnection t = connections.get(IDX_SYNCH).get(i);
            if (t.lockIfReady()) {
                t.ping();
                t.unlock();
            }
        }

        //! Assuming all worker threads are free, every worker thread will receive 1 ping operation request
        //! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter
        //! as the sole purpose is to prevent connections from idling.
        for (int i = 0; i < connections.get(IDX_ASYNC).size(); ++i) {
            enqueue( i, new PingTask());
        }

    }


    /**
     * @return 返回当前所有异步队列的中数据的总个数
     */
    @Override
    public int getAsyncQueueSize() {
        int totalTask = 0;
        for( int i = 0; i < async_threads; i++ ){
            totalTask += sqlQueues[ i ].size();
        }
        return totalTask;
    }

}
